package com.zyf.apitest.sink;

import com.zyf.apitest.beans.SensorReading;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;

import java.util.Properties;

/**
 * @author Malegod_xiaofei
 * @create 2021-12-27-22:43
 */
public class SinkTest1_Kafka {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置并行度
        env.setParallelism(4);

//        // 从文件读取数据
//        DataStream<String> inputStream = env.readTextFile("D:\\Code\\IDEA-Project\\FlinkTutorial\\src\\main\\resources\\sensor.txt");

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");

        // 从 Kafka 读取数据
        DataStream<String> inputStream = env.addSource(new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), properties));


        // 转换成 SensorReading 类型
        DataStream<String> dataStream = inputStream.map(line -> {
            String[] fileds = line.split(",");
            return new SensorReading(fileds[0], new Long(fileds[1]), new Double(fileds[2])).toString();
        });

        dataStream.addSink(new FlinkKafkaProducer011<String>("localhost:9092", "sinktest", new SimpleStringSchema()));

        // 执行
        env.execute();
    }
}
